package com.atguigu.flink.splitAndUnionStream;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class innnerJoinByConnect {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
                Tuple2.of(1, "a1"),
                Tuple2.of(1, "a2"),
                Tuple2.of(2, "b"),
                Tuple2.of(3, "c")
        );

        DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
                Tuple3.of(1, "aa1", 1),
                Tuple3.of(1, "aa2", 2),
                Tuple3.of(2, "bb", 1),
                Tuple3.of(3, "cc", 1)
        );

        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);

        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> keyedDS = connect.keyBy(0, 0);

        SingleOutputStreamOperator<String> processDS = keyedDS.process(
                new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {

                    HashMap<Integer, List<Tuple2<Integer, String>>> ds1Cache = new HashMap<>();
                    HashMap<Integer, List<Tuple3<Integer, String, Integer>>> ds2Cache = new HashMap<>();

                    @Override
                    public void processElement1(Tuple2<Integer, String> tup2, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context context, Collector<String> collector) throws Exception {
                        Integer id = tup2.f0;
                        if (ds1Cache.containsKey(id)) {
                            ds1Cache.get(id).add(tup2);
                        } else {
                            List<Tuple2<Integer, String>> ds1List = new ArrayList<>();
                            ds1List.add(tup2);
                            ds1Cache.put(id, ds1List);
                        }

                        if (ds2Cache.containsKey(id)) {
                            for (Tuple3<Integer, String, Integer> tup3 : ds2Cache.get(id)) {
                                collector.collect(tup2 + "---" + tup3);
                            }
                        }
                    }

                    @Override
                    public void processElement2(Tuple3<Integer, String, Integer> tup3, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context context, Collector<String> collector) throws Exception {
                        Integer id = tup3.f0;
                        if (ds2Cache.containsKey(id)) {
                            ds2Cache.get(id).add(tup3);
                        } else {
                            List<Tuple3<Integer, String, Integer>> ds2List = new ArrayList<>();
                            ds2List.add(tup3);
                            ds2Cache.put(id, ds2List);
                        }

                        if (ds1Cache.containsKey(id)) {
                            for (Tuple2<Integer, String> tup2 : ds1Cache.get(id)) {
                                collector.collect(tup3 + "---" + tup2);
                            }
                        }
                    }
                }
        );

        processDS.print();

        env.execute();

    }

}
